/*
 * 作者:熊猫
 * 日期:2017 - 3 - 19 10 - 58 - 37
 *
 */

package net.lainiao.morethread.Chapter7.demo9;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by Administrator on 2017/3/19.
 */
public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
    private AtomicInteger counter;
    private LinkedBlockingDeque<E> transfered;
    private ReentrantLock lock;

    public MyPriorityTransferQueue() {
        this.counter = new AtomicInteger();
        this.transfered = new LinkedBlockingDeque<E>();
        lock = new ReentrantLock();
    }

    @Override
    public boolean tryTransfer(E e) {
       lock.lock();
       boolean val=false;
       if(counter.get()!=0){
           val=true;
           put(e);
       }
       lock.unlock();
       return val;
    }

    @Override
    public void transfer(E e) throws InterruptedException {
        lock.lock();
        if(counter.get()!=0){
            put(e);
            lock.unlock();
        }else{
            transfered.add(e);
            lock.unlock();
            synchronized (e){
                e.wait();
            }
        }
    }

    @Override
    public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        lock.lock();
        if(counter.get()!=0){
            put(e);
            lock.unlock();
        }else{
            transfered.add(e);
            long newTimeout=TimeUnit.MILLISECONDS.convert(timeout,unit);
            lock.unlock();
            e.wait(newTimeout);
            lock.lock();
            if(transfered.contains(e)){
                transfered.remove(e);
                lock.unlock();
                return false;
            }else{
                lock.unlock();
                return true;
            }
        }
        return false;
    }

    @Override
    public boolean hasWaitingConsumer() {
        return counter.get()!=0;
    }

    @Override
    public int getWaitingConsumerCount() {
        return counter.get();
    }

    @Override
    public E take() throws InterruptedException {
        lock.lock();
        counter.incrementAndGet();
        E value=transfered.poll();
        if(value==null){
            lock.unlock();
            value=super.take();
            lock.lock();
        }else{
            synchronized (value){
                value.notify();
            }
        }
        counter.decrementAndGet();
        lock.unlock();
        return value;
    }
}
